方式1 全注解
发送者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @RestController @RequestMapping("/rabbit") public class MqSend { @Autowired private AmqpTemplate amqpTemplate;
@RequestMapping("/send") public void test() { amqpTemplate.convertAndSend("myQueuebingExchange", "now" + new Date()); System.out.println("监听队列中~~~~~~~~~~"); } }
|
消费者
1 需要手动创建队列不推荐
1 2 3 4 5 6 7 8
| @Slf4j @Component public class MqReceiver { @RabbitListener(queues = "myQueue") public void process(String message) { log.info("message={}", message); }
|
2 自动创建队列
1 2 3 4 5 6 7 8
| @Slf4j @Component public class MqReceiver { public void process(String message) { log.info("message={}", message); }
|
3 自动创建队列 并自动绑定交换机
1 2 3 4 5 6 7 8 9 10
| @Slf4j @Component public class MqReceiver { @RabbitListener(bindings = @QueueBinding(value = @Queue("myQueuebingExchange"), exchange = @Exchange("myExchange") )) public void process(String message) { log.info("message={}", message); }
|
4 自动创建队列 并自动绑定交换机自动绑定路由
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Slf4j @Component public class MqReceiver { @RabbitListener(bindings = @QueueBinding( value = @Queue("computerQueue"), exchange = @Exchange("computerExchange"), key = "computerkey" )) public void processComputer(String message) { log.info("message={}", message); }
@RabbitListener(bindings = @QueueBinding( value = @Queue("fruitQueue"), key = "furitKey", exchange = @Exchange("Fruitexchange") )) public void processFruit(String message) { log.info("message={}", message); }
|
发送方
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Test public void sendMessage() { amqpTemplate.convertAndSend("myQueuebingExchange", "now" + new Date()); System.out.println("监听队列中"); }
@Test public void sendorder() { amqpTemplate.convertAndSend("computerQueue", "computerkey", "now" + new Date() + "dianzi"); System.out.println("电子产品监听队列中~~~"); }
@Test public void sendfruit() { amqpTemplate.convertAndSend("fruitQueue", "furitKey", "now" + new Date() + "fruit"); System.out.println("水果监听队列中~~~");
}
|
⚠️。上述发送队列将自动绑定到交换机上 不需要指定交换机的名字 只需要指定队列和路由key
===
方式2。配置文件的形式
* Direct模式 交换机Exchange.

配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Configuration public class MQConfig { public static final String QUEUE = "queue";
@Bean public Queue queue() { return new Queue(QUEUE, true); }
|
#### sender
1 2 3 4 5 6 7 8 9 10 11 12
| @Service public class MQSender {
private static Logger log = LoggerFactory.getLogger(MQSender.class); @Autowired AmqpTemplate amqpTemplate ; public void send(Object message) { amqpTemplate.convertAndSend(MQConfig.QUEUE, msg); log.info("send message:"+msg); }
|
receive
1 2 3 4 5 6
| @Service public class MQReceiver { @RabbitListener(queues=MQConfig.QUEUE) public void receive(String message) { log.info("receive message:"+message); }
|
Topic模式 交换机Exchange

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Configuration public class MQConfig {
public static final String TOPIC_QUEUE1 = "topic.queue1"; public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String TOPIC_EXCHANGE = "topicExchage";
@Bean public Queue topicQueue1() { return new Queue(TOPIC_QUEUE1, true); } @Bean public Queue topicQueue2() { return new Queue(TOPIC_QUEUE2, true); } @Bean public TopicExchange topicExchage(){ return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Binding topicBinding1() { return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1"); } @Bean public Binding topicBinding2() { return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#"); }
|
sender
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Service public class MQSender {
private static Logger log = LoggerFactory.getLogger(MQSender.class); @Autowired AmqpTemplate amqpTemplate ; public void sendTopic(Object message) { String msg = RedisService.beanToString(message); log.info("send topic message:"+msg); amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", msg+"1"); amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", msg+"2"); }
|
Receive
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Service public class MQReceiver { @RabbitListener(queues=MQConfig.TOPIC_QUEUE1) public void receiveTopic1(String message) { log.info(" topic queue1 message:"+message); } @RabbitListener(queues=MQConfig.TOPIC_QUEUE2) public void receiveTopic2(String message) { log.info(" topic queue2 message:"+message); }
|
Fanout模式

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Configuration public class MQConfig {
public static final String TOPIC_QUEUE1 = "topic.queue1"; public static final String TOPIC_QUEUE2 = "topic.queue2"; public static final String FANOUT_EXCHANGE = "fanoutxchage";
@Bean public Queue topicQueue1() { return new Queue(TOPIC_QUEUE1, true); } @Bean public Queue topicQueue2() { return new Queue(TOPIC_QUEUE2, true); }
@Bean public FanoutExchange fanoutExchage(){ return new FanoutExchange(FANOUT_EXCHANGE); } @Bean public Binding FanoutBinding1() { return BindingBuilder.bind(topicQueue1()).to(fanoutExchage()); } @Bean public Binding FanoutBinding2() { return BindingBuilder.bind(topicQueue2()).to(fanoutExchage()); }
|
sender
1 2 3 4 5 6 7
| public void sendFanout(Object message) { String msg = RedisService.beanToString(message); log.info("send fanout message:"+msg); amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
@Bean public HeadersExchange headersExchage(){ return new HeadersExchange(HEADERS_EXCHANGE); } @Bean public Queue headerQueue1() { return new Queue(HEADER_QUEUE, true); } @Bean public Binding headerBinding() { Map<String, Object> map = new HashMap<String, Object>(); map.put("header1", "value1"); map.put("header2", "value2"); return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match(); }
|
sender
1 2 3 4 5 6 7 8 9
| public void sendHeader(Object message) { String msg = RedisService.beanToString(message); log.info("send fanout message:"+msg); MessageProperties properties = new MessageProperties(); properties.setHeader("header1", "value1"); properties.setHeader("header2", "value2"); Message obj = new Message(msg.getBytes(), properties); amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", obj); }
|
Receive
1 2 3 4
| @RabbitListener(queues=MQConfig.HEADER_QUEUE) public void receiveHeaderQueue(byte[] message) { log.info(" header queue message:"+new String(message)); }
|
Author:
John Doe
Permalink:
http://yoursite.com/2019/08/10/消息队列/RabbitMQ/RabbitMQ整合/RabbitMq 慕课/
License:
Copyright (c) 2019 CC-BY-NC-4.0 LICENSE
Slogan:
Do you believe in DESTINY?